/**
 * <p>Title: AnnotationParser.java</p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2017</p>
 * <p>Company: www.zto.com</p>
 */
package com.zto.boot.hbase.annotation;

import com.zto.boot.hbase.configuration.HbaseData;
import com.zto.boot.hbase.exception.HBaseClientException;
import com.zto.boot.hbase.util.TypeUtil;
import org.hbase.async.DeleteRequest;
import org.hbase.async.GetRequest;
import org.hbase.async.KeyValue;
import org.hbase.async.PutRequest;
import org.springframework.core.convert.support.DefaultConversionService;

import java.io.UnsupportedEncodingException;
import java.lang.reflect.Field;
import java.math.BigDecimal;
import java.util.*;

/**
 * <p>Class: AnnotationParser</p>
 * <p>Description: </p>
 * @author xiaowenke@zto.cn
 * @date 2018/5/19
 * @version 1.0
 */
public class AnnotationParser {

    /**
     * 解析put请求的实体
     * @param data 实体对象
     * @return PutRequest集合
     */
    public static <T> Set<PutRequest> parsePutRequest(T data) {

        if (data instanceof String || isWrapClass(data.getClass())) {
            throw HBaseClientException.exception("Hbase实体注解解析错误，传入的参数为基本数据类型或String");
        } else {

            byte[] tableName = getTableName(data.getClass());
            try {
                byte[] rowKey = stringToBytes(getRowkey(data));
                long version = getVersion(data);

                Set<HbaseData> hbaseDatas = getHbaseDatas(data);

                Set<PutRequest> puts = new HashSet<>();
                hbaseDatas.forEach( value -> {
                    puts.add(new PutRequest(
                                    tableName,
                                    rowKey,
                                    value.getFamily(),
                                    value.getQualifier(),
                                    value.getValue()
                            ));
                });
                return puts;
            } catch (IllegalAccessException e) {
                throw HBaseClientException.exception("Hbase实体注解解析未知错误");
            }
        }
    }

    /**
     * 解析Delete请求的实体
     *
     * @param clazz  实体Class
     * @param rowKey hbase-rowKey
     * @return DELETE请求的对象
     */
    public static <T> DeleteRequest parseDeleteRequest(Class<T> clazz, String rowKey) {
        byte[] table = getTableName(clazz);
        DeleteRequest deleteRequest = new DeleteRequest(table, stringToBytes(rowKey));
        return deleteRequest;
    }

    /**
     * 解析GET请求的实体
     *
     * @param clazz  实体Class
     * @param rowKey hbase-rowKey
     * @return GET请求的对象
     */
    public static <T> GetRequest parseGetRequest(Class<T> clazz, String rowKey) {
        byte[] table = getTableName(clazz);
        GetRequest getRequest = new GetRequest(table, stringToBytes(rowKey));
        return getRequest;
    }

    /**
     * 解析返回查询的数据并响应
     *
     * @param clazz        映射的Class
     * @param cells 查询的结果集
     * @return 映射后的结果集
     * @throws IllegalAccessException
     * @throws InstantiationException
     */
    public static <T> T parseGetResponse(Class<T> clazz, List<KeyValue> cells)  throws IllegalAccessException, InstantiationException {
        
        if (cells == null || cells.isEmpty()) {
            return null;
        }
        T t = clazz.newInstance();
        Field[] declaredFields = t.getClass().getDeclaredFields();
        for (KeyValue cell : cells) {
            Field field = findField(declaredFields, cell);
            if (field != null) {

                if (field.getType().isAssignableFrom(Date.class)) {
                    long val = com.zto.boot.hbase.util.Bytes.toLong(cell.value());
                    field.setAccessible(true);
                    field.set(t, new Date(val));
                } else if (TypeUtil.isBooleanType(field)){
                    boolean val = com.zto.boot.hbase.util.Bytes.toBoolean(cell.value());
                    field.setAccessible(true);
                    field.set(t, val);
                } else if (TypeUtil.isShortType(field)){
                    short val = com.zto.boot.hbase.util.Bytes.toShort(cell.value());
                    field.setAccessible(true);
                    field.set(t, val);
                } else if (TypeUtil.isIntType(field)){
                    int val = com.zto.boot.hbase.util.Bytes.toInt(cell.value());
                    field.setAccessible(true);
                    field.set(t, val);
                } else if (TypeUtil.isLongType(field)){
                    long val = com.zto.boot.hbase.util.Bytes.toLong(cell.value());
                    field.setAccessible(true);
                    field.set(t, val);
                } else if (TypeUtil.isFloatType(field)){
                    float val = com.zto.boot.hbase.util.Bytes.toFloat(cell.value());
                    field.setAccessible(true);
                    field.set(t, val);
                } else if (TypeUtil.isDoubleType(field)){
                    double val = com.zto.boot.hbase.util.Bytes.toDouble(cell.value());
                    field.setAccessible(true);
                    field.set(t, val);
                } else if (TypeUtil.isBigDecimalType(field)){
                    BigDecimal val = com.zto.boot.hbase.util.Bytes.toBigDecimal(cell.value());
                    field.setAccessible(true);
                    field.set(t, val);
                } else{
                    String val = new String(cell.value());
                    DefaultConversionService conversionService = new DefaultConversionService();
                    if (conversionService.canConvert(String.class, field.getType())) {
                        Object convert = conversionService.convert(val, field.getType());
                        field.setAccessible(true);
                        field.set(t, convert);
                    }
                }
            }

        }

        Field versionField = findVersionField(declaredFields);
        if (versionField != null) {
            versionField.setAccessible(true);
            versionField.set(t, cells.get(0).timestamp());
        }

        Field rowKeyField = findRowKeyField(declaredFields);
        if (rowKeyField != null && rowKeyField.getType().isAssignableFrom(String.class)) {
            rowKeyField.setAccessible(true);
            rowKeyField.set(t, new String(cells.get(0).key()));
        }

        return t;
    }


    /**
     * 得到Hbase表名称
     *
     * @param clazz 目标类Class
     * @return 表名称
     */
    private static <T> byte[] getTableName(Class<T> clazz) {
        HTable hTable = clazz.getAnnotation(HTable.class);
        if (hTable == null) {
            throw HBaseClientException.exception("Hbase实体注解解析错误，请在类上注解@HTable");
        }

        return stringToBytes(hTable.name());

    }


    /**
     * 获取目标版本号
     *
     * @param entity 目标实体
     * @throws IllegalAccessException
     */
    private static <T> long getVersion(T entity) throws IllegalAccessException {
        int count = 0;
        long version = 0;
        Class<?> clazz = entity.getClass();
        Field[] fields = clazz.getDeclaredFields();

        for (Field field : fields) {
            HVersion annotation = field.getAnnotation(HVersion.class);
            if (annotation != null) {
                count++;
                field.setAccessible(true);
                Object v = field.get(entity);
                if (field.getType().isAssignableFrom(Long.class) || field.getType().isAssignableFrom(long.class)) {

                    version = (long) v;
                } else {
                    throw HBaseClientException.exception("Hbase实体注解解析错误，@Version注解的属性类型必须为Long");
                }
            }
        }

        if (count < 2) {
            return version;
        }
        throw HBaseClientException.exception("Hbase实体注解解析错误，@Version注解只能有一个");
    }

    /**
     * 查询@HVersion注解的属性对象Field
     *
     * @param declaredFields
     * @return 目标属性对象Field
     */
    private static Field findVersionField(Field[] declaredFields) {
        for (Field field : declaredFields) {
            HVersion version = field.getAnnotation(HVersion.class);
            if (version != null) {
                return field;
            }
        }
        return null;
    }

    /**
     * 获得rowKey
     *
     * @param entity 目标实体
     * @throws IllegalAccessException
     */
    private static <T> String getRowkey(T entity) throws IllegalAccessException {
        String obj = null;
        int count = 0;
        Class<?> clazz = entity.getClass();
        Field[] fields = clazz.getDeclaredFields();
        for (Field field : fields) {
            HRowKey hRowKey = field.getAnnotation(HRowKey.class);
            if (hRowKey != null) {
                count++;
                field.setAccessible(true);
                if (field.getType().isAssignableFrom(String.class)) {
                    Object o = field.get(entity);
                    if (o != null && !(o.toString() == null ||"".equals(o.toString()))) {
                        obj = o.toString();
                    } else {
                        throw HBaseClientException.exception("Hbase实体注解解析错误，" + field.getName() + "属性的值不能为空");
                    }

                } else {
                    throw HBaseClientException.exception("Hbase实体注解解析错误，" + field.getName() + "属性的类型必须为String");
                }
            }
        }

        if (count == 1) {
            return obj;
        } else {
            throw HBaseClientException.exception("Hbase实体注解解析错误，@HRowKey有且只能有一个");
        }
    }

    /**
     * 查询@HRowKey
     *
     * @param declaredFields
     * @return 目标属性对象Field
     */
    private static Field findRowKeyField(Field[] declaredFields) {
        for (Field field : declaredFields) {
            HRowKey rowkey = field.getAnnotation(HRowKey.class);
            if (rowkey != null) {
                return field;
            }
        }
        return null;
    }


    /**
     * 获取Put对象的信息
     *
     * @param t Put的实体类
     * @throws IllegalAccessException
     */
    private static  <T> Set<HbaseData> getHbaseDatas(T t) throws IllegalAccessException {

        Set<HbaseData> hbaseDatas = new HashSet<>();

        Class<?> clazz = t.getClass();
        Field[] fields = clazz.getDeclaredFields();
        for (Field field : fields) {
            HColumn annotation = field.getAnnotation(HColumn.class);
            if (annotation != null) {
                byte[] family = stringToBytes(annotation.family());
                byte[] qualifier = stringToBytes(annotation.name());
                byte[] value = null;
                field.setAccessible(true);
                Object obj = field.get(t);
                if (obj == null) {
                    continue;
                } else {
                    if (obj instanceof Date) {
                        Date date = (Date) obj;
                        value = com.zto.boot.hbase.util.Bytes.toBytes(date.getTime());
                    } else if (TypeUtil.isBooleanType(field)) {
                        value = com.zto.boot.hbase.util.Bytes.toBytes((Boolean) obj);
                    } else if (TypeUtil.isShortType(field)) {
                        value = com.zto.boot.hbase.util.Bytes.toBytes((Short) obj);
                    } else if (TypeUtil.isIntType(field)) {
                        value = com.zto.boot.hbase.util.Bytes.toBytes((Integer) obj);
                    } else if (TypeUtil.isLongType(field)) {
                        value = com.zto.boot.hbase.util.Bytes.toBytes((Long) obj);
                    } else if (TypeUtil.isFloatType(field)) {
                        value = com.zto.boot.hbase.util.Bytes.toBytes((Float) obj);
                    } else if (TypeUtil.isDoubleType(field)) {
                        value = com.zto.boot.hbase.util.Bytes.toBytes((Double) obj);
                    } else if (TypeUtil.isBigDecimalType(field)) {
                        value = com.zto.boot.hbase.util.Bytes.toBytes((BigDecimal) obj);
                    } else if (TypeUtil.isStringType(field)){
                        value = stringToBytes(obj.toString());
                    } else {
                        throw HBaseClientException.exception("Hbase实体注解解析错误，" + field.getName() + "属性的值类型不合法! 现只支持 [" + TypeUtil.getSuitableTypes() + "]" );
                    }
                }

                hbaseDatas.add(new HbaseData(family, qualifier, value));
            }
        }
        return hbaseDatas;
    }

    /**
     * 查询属性对象Field
     *
     * @param declaredFields 类定义的所有属性。
     * @param keyValue       查询条件
     * @return 属性对象Field
     */
    private static Field findField(Field[] declaredFields, KeyValue keyValue) {
        for (Field field : declaredFields) {
            HColumn hColumn = field.getAnnotation(HColumn.class);

            if (hColumn != null && hColumn.family().equals(new String(keyValue.family())) && hColumn.name().equals(new String(keyValue.qualifier()))) {
                return field;
            }
        }
        return null;
    }

    /**
     * 判断是否是包装类
     *
     * @param clz Class
     * @return
     */
    public static boolean isWrapClass(Class clz) {
        try {
            return ((Class) clz.getField("TYPE").get(null)).isPrimitive();
        } catch (Exception e) {
            return false;
        }
    }


    /**
     * String转 byte[]
     *
     * @param str
     * @return
     */
    public static byte[] stringToBytes(String str) {
        try {
            return str.getBytes("UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw HBaseClientException.exception("字符串序列化为byte[]异常{0}", e);
        }
    }

}
